using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Rhino.Etl.Core;
using Rhino.Etl.Core.Operations;

namespace Sqloogle.Operations.Support {

    /// <summary>
    /// Combines rows from all operations.
    /// </summary>
    public class UnionAllOperationParallel : AbstractOperation {

        private readonly List<IOperation> _operations = new List<IOperation>();

        public UnionAllOperationParallel() {}

        public UnionAllOperationParallel(IEnumerable<IOperation> ops) {
            _operations.AddRange(ops);
        }

        /// <summary>
        /// Executes the added operations in parallel.
        /// Warning: Errors in the operations may make this operation hang,
        /// use UnionAllOperationSerial instead to debug the errors.
        /// </summary>
        /// <param name="rows"></param>
        /// <returns></returns>
        public override IEnumerable<Row> Execute(IEnumerable<Row> rows) {

            var blockingCollection = new BlockingCollection<Row>();

            Debug("Creating tasks for {0} operations.", _operations.Count);

            var tasks = _operations.Select(currentOp => Task.Factory.StartNew(() => {
                Trace("Executing {0} operation.", currentOp.Name);
                foreach (var row in currentOp.Execute(null)) {
                    blockingCollection.Add(row);
                }
                blockingCollection.Add(null); // free the consumer thread
            })).ToArray();

            Row r;
            while (true) {
                if (tasks.All(x => x.IsFaulted || x.IsCanceled || x.IsCompleted)) {
                    Debug("All tasks have been canceled, have faulted, or have completed.");
                    break;
                }

                r = blockingCollection.Take();
                if (r == null)
                    continue;

                yield return r;

            }

            while (blockingCollection.TryTake(out r)) {
                if (r == null)
                    continue;
                yield return r;
            }

            Task.WaitAll(tasks); // raise any exception that were raised during execption

        }

        /// <summary>
        /// Initializes this instance
        /// </summary>
        /// <param name="pipelineExecuter">The current pipeline executer.</param>
        public override void PrepareForExecution(IPipelineExecuter pipelineExecuter) {
            foreach (var operation in _operations) {
                operation.PrepareForExecution(pipelineExecuter);
            }
        }

        /// <summary>
        /// Add operation parameters
        /// </summary>
        /// <param name="ops">operations delimited by commas</param>
        /// <returns></returns>
        public UnionAllOperationParallel Add(params IOperation[] ops) {
            _operations.AddRange(ops);
            return this;
        }

        /// <summary>
        /// Add operations
        /// </summary>
        /// <param name="ops">an enumerable of operations</param>
        /// <returns></returns>
        public UnionAllOperationParallel Add(IEnumerable<IOperation> ops) {
            _operations.AddRange(ops);
            return this;
        }

    }
}
